package com.atguigu.app

import com.alibaba.fastjson.JSON
import com.atguigu.bean.StartUpLog
import com.atguigu.constants.GmallConstants
import com.atguigu.handler.DauHandler
import com.atguigu.utils.MyKafkaUtil
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.text.SimpleDateFormat
import java.util.Date

object DauApp {
  def main(args: Array[String]): Unit ={

    val conf = new SparkConf().setMaster("local[*]").setAppName("DauApp")

    val ssc = new StreamingContext(conf, Seconds(3))

    val kafkaDStream = MyKafkaUtil.getKafkaStream(GmallConstants.KAFKA_TOPIC_STARTUP, ssc)

    val sdf = new SimpleDateFormat("yyyy-MM-dd HH")

    val startUpLogDStream = kafkaDStream.mapPartitions(partitions => {
      partitions.map(record => {
        // 将kafka中的json数据转换成样例类
        val log = JSON.parseObject(record.value(), classOf[StartUpLog])

        // 补充样例类中的字段
        val lineDate = sdf.format(new Date(log.ts))

        val str = lineDate.split(" ");

        log.logDate = str(0)
        log.logHour = str(1)

        log
      })
    })

    startUpLogDStream.cache()
    startUpLogDStream.count().print()
    startUpLogDStream.print()

    // 进行批次间去重
    val filterByRedisDStream = DauHandler.filterByRedis(startUpLogDStream, ssc.sparkContext)
    filterByRedisDStream.count().print()
    filterByRedisDStream.print()

    // 进行批次内去重
    val filterByGroupDStream = DauHandler.filterbyGroup(filterByRedisDStream)
    filterByGroupDStream.count().print()
    filterByGroupDStream.print()

    // 保存到redis
//    DauHandler.saveMidToRedis(startUpLogDStream)

    ssc.start()
    ssc.awaitTermination()

  }
}
